An actor model in concurrent programming is a conceptual model for handling concurrent computation, the core concepts of an Actor model are as follows
we can start out the Rust project with the following command ( for now we simply name it as tyactor
, stands for typed-actor
)
cargo new --lib tyactor
but this only creates a project with a library file. we need some way to run it as well. so let's add a binary file to it. Let this be inside the folder app
mkdir app
touch app/main.rs
now change the Cargo.toml
accordingly so that we can expose the library as well as run an example using the library functions with cargo
command itself.
[lib]
name = "tyactor"
path = "src/lib.rs"
[[bin]]
name = "example"
path = "app/main.rs"
now to run the code inside app/main.rs
we only need to run the command
cargo run example
consider the following code for creating a mpsc::channel
use tokio::sync::{mpsc, oneshot};
#[tokio::main]
async fn main() {
// mpsc::channel retursn a Sender (tx) and a mutable Receiver (mut rx)
let (tx1, mut rx) = mpsc::channel(32);
// we clone the Sender to simulate the idea of sending data from two
// different sources, both of these senders will be pointing to the
// same receiver
let tx2 = tx1.clone();
// spawns an async tokio task that sends the data to the receiver.
// this is an async operation as the `tx1.send()` returns a future
// on which we `await` to see if the value was sent correctly, we may
// have to wait here if the receiver queue is full. the `move` keywords
// here means that the value `tx1` is moved into this context and is
// dropped as soon as the block finishes executing. ie, when the data
// is sent
tokio::spawn(async move {
if let Err(_) = tx1.send(3).await {
println!("the receiver dropped");
}
});
tokio::spawn(async move {
if let Err(_) = tx2.send(4).await {
println!("the receiver dropped");
}
});
// the Receiver will return a `Some(value)` as long as there's a Sender
// active. and in this case when both the Senders are dropped the
// `rx.recv().await` returns None. otherwise it polls the next message
// from the queue and we can process the message
while let Some(msg) = rx.recv().await {
println!("Got {:?}", msg);
}
}
mpsc::Channel
let's use the previous ideas to write two structs for Actor
which is responsible for handling messages sent to it and an ActorHandle
which can be used to send messages to the Actor
. This code is inspired by Actors with Tokio
use std::marker::Send;
use tokio::sync::mpsc;
/// The Actor struct, responsible for spawning the actor that receive the
/// messages and then handle them, the actor itself may have a state that can
/// be affected by the message
pub struct Actor<M: Send + 'static, S: Default + Send> {
// a handle to the receiver from mpsc::channel so that we can use it to
// receive messages
receiver: mpsc::Receiver<M>,
// the state of the actor that can be modified by the handle function
state: S,
// the state is borrowed mutably so that the function may modify it
handle: fn(&mut S, msg: M) -> (),
}
impl<M: Send + 'static, S: Default + Send> Actor<M, S> {
pub fn new(tx: mpsc::Receiver<M>, f: fn(&mut S, M) -> ()) -> Self {
Actor {
receiver: tx,
state: S::default(),
handle: f,
}
}
pub async fn start(mut self) {
while let Some(msg) = self.receiver.recv().await {
(self.handle)(&mut self.state, msg)
}
}
}
/// ActorHandle can be used to send messages to the respective actor.
#[derive(Clone)]
pub struct ActorHandle<M: Send + 'static> {
// holds a handle to the sender from mpsc::channel
id: mpsc::Sender<M>,
}
impl<M: Send> ActorHandle<M> {
// when we create a new actor what we only return is the handle, during
// it's creation we launch the actor and create a handle to it as well.
pub fn new<S: Default + Send + 'static>(size: usize, f: fn(&mut S, M) -> ()) -> Self {
let (tx, rx): (mpsc::Sender<M>, mpsc::Receiver<M>) = mpsc::channel(size);
let actor: Actor<M, S> = Actor::new(rx, f);
tokio::spawn(async move {
let _ = actor.start().await;
});
let handle = ActorHandle { id: tx };
return handle;
}
pub async fn send(self, msg: M) -> () {
if let Err(e) = self.id.send(msg).await {
eprintln!("{:?}", e);
}
}
}
let's capture the idea of an Actor
through a trait to make this process more straightforward
pub trait ActorTrait {
type State : Default + Send + 'static;
type Message : Send + 'static;
fn handle(state: &mut Self::State, msg: Self::Message) -> ();
}
changing the ActorHandle
implementation into
pub fn new<A : ActorTrait<Message = M>>(size: usize) -> Self {
let (tx, rx): (mpsc::Sender<M>, mpsc::Receiver<M>)
= mpsc::channel(size);
let actor: Actor<M, <A as ActorTrait>::State>
= Actor::new(rx, <A as ActorTrait>::handle);
tokio::spawn(async move {
let _ = actor.start().await;
});
let handle = ActorHandle { id: tx };
return handle;
}
This page is still Work in Progress
checkout more of my works at :
🏠 home